#ifndef BEAST_ASIO_IO_LATENCY_PROBE_H_INCLUDED
#define BEAST_ASIO_IO_LATENCY_PROBE_H_INCLUDED

#include <xrpl/beast/utility/instrumentation.h>

#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>

#include <chrono>
#include <condition_variable>
#include <mutex>
#include <stdexcept>

namespace beast {

/** Measures handler latency on an io_context queue. */
template <class Clock>
class io_latency_probe
{
private:
    using duration = typename Clock::duration;
    using time_point = typename Clock::time_point;

    std::recursive_mutex m_mutex;
    std::condition_variable_any m_cond;
    std::size_t m_count;
    duration const m_period;
    boost::asio::io_context& m_ios;
    boost::asio::basic_waitable_timer<std::chrono::steady_clock> m_timer;
    bool m_cancel;

public:
    io_latency_probe(duration const& period, boost::asio::io_context& ios)
        : m_count(1)
        , m_period(period)
        , m_ios(ios)
        , m_timer(m_ios)
        , m_cancel(false)
    {
    }

    ~io_latency_probe()
    {
        std::unique_lock<decltype(m_mutex)> lock(m_mutex);
        cancel(lock, true);
    }

    /** Return the io_context associated with the latency probe. */
    /** @{ */
    boost::asio::io_context&
    get_io_context()
    {
        return m_ios;
    }

    boost::asio::io_context const&
    get_io_context() const
    {
        return m_ios;
    }
    /** @} */

    /** Cancel all pending i/o.
        Any handlers which have already been queued will still be called.
    */
    /** @{ */
    void
    cancel()
    {
        std::unique_lock<decltype(m_mutex)> lock(m_mutex);
        cancel(lock, true);
    }

    void
    cancel_async()
    {
        std::unique_lock<decltype(m_mutex)> lock(m_mutex);
        cancel(lock, false);
    }
    /** @} */

    /** Measure one sample of i/o latency.
        Handler will be called with this signature:
            void Handler (Duration d);
    */
    template <class Handler>
    void
    sample_one(Handler&& handler)
    {
        std::lock_guard lock(m_mutex);
        if (m_cancel)
            throw std::logic_error("io_latency_probe is canceled");
        boost::asio::post(
            m_ios,
            sample_op<Handler>(
                std::forward<Handler>(handler), Clock::now(), false, this));
    }

    /** Initiate continuous i/o latency sampling.
        Handler will be called with this signature:
            void Handler (std::chrono::milliseconds);
    */
    template <class Handler>
    void
    sample(Handler&& handler)
    {
        std::lock_guard lock(m_mutex);
        if (m_cancel)
            throw std::logic_error("io_latency_probe is canceled");
        boost::asio::post(
            m_ios,
            sample_op<Handler>(
                std::forward<Handler>(handler), Clock::now(), true, this));
    }

private:
    void
    cancel(std::unique_lock<decltype(m_mutex)>& lock, bool wait)
    {
        if (!m_cancel)
        {
            --m_count;
            m_cancel = true;
        }

        if (wait)
            m_cond.wait(lock, [this] { return this->m_count == 0; });
    }

    void
    addref()
    {
        std::lock_guard lock(m_mutex);
        ++m_count;
    }

    void
    release()
    {
        std::lock_guard lock(m_mutex);
        if (--m_count == 0)
            m_cond.notify_all();
    }

    template <class Handler>
    struct sample_op
    {
        Handler m_handler;
        time_point m_start;
        bool m_repeat;
        io_latency_probe* m_probe;

        sample_op(
            Handler const& handler,
            time_point const& start,
            bool repeat,
            io_latency_probe* probe)
            : m_handler(handler)
            , m_start(start)
            , m_repeat(repeat)
            , m_probe(probe)
        {
            XRPL_ASSERT(
                m_probe,
                "beast::io_latency_probe::sample_op::sample_op : non-null "
                "probe input");
            m_probe->addref();
        }

        sample_op(sample_op&& from) noexcept
            : m_handler(std::move(from.m_handler))
            , m_start(from.m_start)
            , m_repeat(from.m_repeat)
            , m_probe(from.m_probe)
        {
            XRPL_ASSERT(
                m_probe,
                "beast::io_latency_probe::sample_op::sample_op(sample_op&&) : "
                "non-null probe input");
            from.m_probe = nullptr;
        }

        sample_op(sample_op const&) = delete;
        sample_op
        operator=(sample_op const&) = delete;
        sample_op&
        operator=(sample_op&&) = delete;

        ~sample_op()
        {
            if (m_probe)
                m_probe->release();
        }

        void
        operator()() const
        {
            if (!m_probe)
                return;
            typename Clock::time_point const now(Clock::now());
            typename Clock::duration const elapsed(now - m_start);

            m_handler(elapsed);

            {
                std::lock_guard lock(m_probe->m_mutex);
                if (m_probe->m_cancel)
                    return;
            }

            if (m_repeat)
            {
                // Calculate when we want to sample again, and
                // adjust for the expected latency.
                //
                typename Clock::time_point const when(
                    now + m_probe->m_period - 2 * elapsed);

                if (when <= now)
                {
                    // The latency is too high to maintain the desired
                    // period so don't bother with a timer.
                    //
                    boost::asio::post(
                        m_probe->m_ios,
                        sample_op<Handler>(m_handler, now, m_repeat, m_probe));
                }
                else
                {
                    m_probe->m_timer.expires_after(when - now);
                    m_probe->m_timer.async_wait(
                        sample_op<Handler>(m_handler, now, m_repeat, m_probe));
                }
            }
        }

        void
        operator()(boost::system::error_code const& ec)
        {
            if (!m_probe)
                return;
            typename Clock::time_point const now(Clock::now());
            boost::asio::post(
                m_probe->m_ios,
                sample_op<Handler>(m_handler, now, m_repeat, m_probe));
        }
    };
};

}  // namespace beast

#endif
